/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.handler.loader;

import static java.util.stream.Collectors.toList;
import static org.apache.solr.common.params.CommonParams.ID;
import static org.apache.solr.common.params.CommonParams.JSON;
import static org.apache.solr.common.params.CommonParams.PATH;
import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
import static org.apache.solr.common.params.ShardParams._ROUTE_;

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.CollectionUtil;
import org.apache.solr.common.util.ContentStream;
import org.apache.solr.common.util.JsonRecordReader;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.handler.RequestHandlerUtils;
import org.apache.solr.handler.UpdateRequestHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.util.RecordingJSONParser;
import org.noggit.JSONParser;
import org.noggit.JSONParser.ParseException;
import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @since solr 4.0
 */
public class JsonLoader extends ContentStreamLoader {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  private static final AtomicBoolean WARNED_ABOUT_INDEX_TIME_BOOSTS = new AtomicBoolean();
  public static final String CHILD_DOC_KEY = "_childDocuments_";

  @Override
  public String getDefaultWT() {
    return JSON;
  }

  @Override
  public void load(
      SolrQueryRequest req,
      SolrQueryResponse rsp,
      ContentStream stream,
      UpdateRequestProcessor processor)
      throws Exception {
    new SingleThreadedJsonLoader(req, rsp, processor).load(req, rsp, stream, processor);
  }

  @SuppressWarnings("unchecked")
  public static SolrInputDocument buildDoc(Map<String, Object> m) {
    SolrInputDocument result = new SolrInputDocument();
    for (Map.Entry<String, Object> e : m.entrySet()) {
      if (mapEntryIsChildDoc(e.getValue())) { // parse child documents
        if (e.getValue() instanceof List<?> value) {
          for (Object o : value) {
            if (o instanceof Map) {
              // retain the value as a list, even if the list contains a single value.
              if (!result.containsKey(e.getKey())) {
                result.setField(e.getKey(), new ArrayList<>(1));
              }
              result.addField(e.getKey(), buildDoc((Map<String, Object>) o));
            }
          }
        } else if (e.getValue() instanceof Map) {
          result.addField(e.getKey(), buildDoc((Map<String, Object>) e.getValue()));
        }
      } else {
        result.setField(e.getKey(), e.getValue());
      }
    }
    return result;
  }

  private static boolean mapEntryIsChildDoc(Object val) {
    if (val instanceof List<?> listVal) {
      if (listVal.size() == 0) return false;
      return listVal.get(0) instanceof Map;
    }
    return val instanceof Map;
  }

  static class SingleThreadedJsonLoader extends ContentStreamLoader {

    protected final UpdateRequestProcessor processor;
    protected final SolrQueryRequest req;
    protected SolrQueryResponse rsp;
    protected JSONParser parser;
    protected final int commitWithin;
    protected final boolean overwrite;

    SingleThreadedJsonLoader(
        SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor processor) {
      this.processor = processor;
      this.req = req;
      this.rsp = rsp;

      commitWithin = req.getParams().getInt(UpdateParams.COMMIT_WITHIN, -1);
      overwrite = req.getParams().getBool(UpdateParams.OVERWRITE, true);
    }

    @Override
    public void load(
        SolrQueryRequest req,
        SolrQueryResponse rsp,
        ContentStream stream,
        UpdateRequestProcessor processor)
        throws Exception {
      try (Reader reader = getReader(stream)) {
        this.processUpdate(reader);
      } catch (ParseException e) {
        throw new SolrException(
            SolrException.ErrorCode.BAD_REQUEST, "Cannot parse provided JSON: " + e.getMessage());
      }
    }

    private Reader getReader(ContentStream stream) throws IOException {
      if (log.isTraceEnabled()) {
        String body = StrUtils.stringFromReader(stream.getReader());
        log.trace("body: {}", body);
        return new StringReader(body);
      }
      return stream.getReader();
    }

    @SuppressWarnings("fallthrough")
    void processUpdate(Reader reader) throws IOException {
      String path = (String) req.getContext().get(PATH);
      if (UpdateRequestHandler.DOC_PATH.equals(path)
          || "false".equals(req.getParams().get("json.command"))) {
        String split = req.getParams().get("split");
        String[] f = req.getParams().getParams("f");
        handleSplitMode(split, f, reader);
        return;
      }
      parser = new JSONParser(reader);
      int ev = parser.nextEvent();
      while (ev != JSONParser.EOF) {

        switch (ev) {
          case JSONParser.ARRAY_START:
            handleAdds();
            break;

          case JSONParser.STRING:
            if (parser.wasKey()) {
              String v = parser.getString();
              if (v.equals(UpdateRequestHandler.ADD)) {
                int ev2 = parser.nextEvent();
                if (ev2 == JSONParser.OBJECT_START) {
                  processor.processAdd(parseAdd());
                } else if (ev2 == JSONParser.ARRAY_START) {
                  handleAdds();
                } else {
                  assertEvent(ev2, JSONParser.OBJECT_START);
                }
              } else if (v.equals(UpdateRequestHandler.COMMIT)) {
                CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);
                cmd.waitSearcher = true;
                parseCommitOptions(cmd);
                processor.processCommit(cmd);
              } else if (v.equals(UpdateRequestHandler.OPTIMIZE)) {
                CommitUpdateCommand cmd = new CommitUpdateCommand(req, true);
                cmd.waitSearcher = true;
                parseCommitOptions(cmd);
                processor.processCommit(cmd);
              } else if (v.equals(UpdateRequestHandler.DELETE)) {
                handleDeleteCommand();
              } else if (v.equals(UpdateRequestHandler.ROLLBACK)) {
                processor.processRollback(parseRollback());
              } else {
                throw new SolrException(
                    SolrException.ErrorCode.BAD_REQUEST,
                    "Unknown command '" + v + "' at [" + parser.getPosition() + "]");
              }
              break;
            }
            // fall through

          case JSONParser.LONG:
          case JSONParser.NUMBER:
          case JSONParser.BIGNUMBER:
          case JSONParser.BOOLEAN:
          case JSONParser.NULL:
            if (log.isInfoEnabled()) {
              log.info(
                  "Can't have a value here. Unexpected {} at [{}]",
                  JSONParser.getEventString(ev),
                  parser.getPosition());
            }

          case JSONParser.OBJECT_START:
          case JSONParser.OBJECT_END:
          case JSONParser.ARRAY_END:
            break;

          default:
            log.info("Noggit UNKNOWN_EVENT_ID: {}", ev);
            break;
        }
        // read the next event
        ev = parser.nextEvent();
      }
    }

    private void handleSplitMode(String split, String[] fields, final Reader reader)
        throws IOException {
      if (split == null) split = "/";
      if (fields == null || fields.length == 0) fields = new String[] {"$FQN:/**"};
      final boolean echo = "true".equals(req.getParams().get("echo"));
      final String srcField = req.getParams().get("srcField");
      final boolean mapUniqueKeyOnly = req.getParams().getBool("mapUniqueKeyOnly", false);
      if (srcField != null) {
        if (!"/".equals(split))
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST, "Raw data can be stored only if split=/");
        parser = new RecordingJSONParser(reader);
      } else {
        parser = new JSONParser(reader);
      }

      JsonRecordReader jsonRecordReader = JsonRecordReader.getInst(split, Arrays.asList(fields));
      jsonRecordReader.streamRecords(
          parser,
          new JsonRecordReader.Handler() {
            ArrayList<Map<String, Object>> docs = null;

            @Override
            public void handle(Map<String, Object> record, String path) {
              Map<String, Object> copy = getDocMap(record, parser, srcField, mapUniqueKeyOnly);

              if (echo) {
                if (docs == null) {
                  docs = new ArrayList<>();
                  rsp.add("docs", docs);
                }
                changeChildDoc(copy);
                docs.add(copy);
              } else {
                AddUpdateCommand cmd = new AddUpdateCommand(req);
                cmd.commitWithin = commitWithin;
                cmd.overwrite = overwrite;
                cmd.solrDoc = buildDoc(copy);
                try {
                  processor.processAdd(cmd);
                } catch (IOException e) {
                  throw new SolrException(
                      SolrException.ErrorCode.BAD_REQUEST, "Error inserting document: ", e);
                }
              }
            }
          });
    }

    private Map<String, Object> getDocMap(
        Map<String, Object> record, JSONParser parser, String srcField, boolean mapUniqueKeyOnly) {
      Map<String, Object> result = mapUniqueKeyOnly ? record : new LinkedHashMap<>(record);
      if (srcField != null && parser instanceof RecordingJSONParser rjp) {
        // if srcFIeld specified extract it out first
        result.put(srcField, rjp.getBuf());
        rjp.resetBuf();
      }
      if (mapUniqueKeyOnly) {
        SchemaField sf = req.getSchema().getUniqueKeyField();
        if (sf == null)
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST, "No uniqueKey specified in schema");
        String df = req.getParams().get(CommonParams.DF);
        if (df == null)
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST, "No 'df' specified in request");
        Map<String, Object> copy = new LinkedHashMap<>();
        String uniqueField = (String) record.get(sf.getName());
        if (uniqueField == null)
          uniqueField = UUID.randomUUID().toString().toLowerCase(Locale.ROOT);
        copy.put(sf.getName(), uniqueField);
        if (srcField != null && result.containsKey(srcField)) {
          copy.put(srcField, result.remove(srcField));
        }
        final List<Object> deepValues = new ArrayList<>();
        deepValues.addAll(result.values());
        copy.put(df, deepValues);
        result = copy;
      }

      return result;
    }

    //
    // "delete":"id"
    // "delete":["id1","id2"]
    // "delete":{"id":"foo"}
    // "delete":{"query":"myquery"}
    //
    void handleDeleteCommand() throws IOException {
      int ev = parser.nextEvent();
      switch (ev) {
        case JSONParser.ARRAY_START:
          handleDeleteArray(ev);
          break;
        case JSONParser.OBJECT_START:
          handleDeleteMap(ev);
          break;
        default:
          handleSingleDelete(ev);
      }
    }

    // returns the string value for a primitive value, or null for the null value
    String getString(int ev) throws IOException {
      switch (ev) {
        case JSONParser.STRING:
          return parser.getString();
        case JSONParser.BIGNUMBER:
        case JSONParser.NUMBER:
        case JSONParser.LONG:
          return parser.getNumberChars().toString();
        case JSONParser.BOOLEAN:
          return Boolean.toString(parser.getBoolean());
        case JSONParser.NULL:
          return null;
        default:
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST,
              "Expected primitive JSON value but got: "
                  + JSONParser.getEventString(ev)
                  + " at ["
                  + parser.getPosition()
                  + "]");
      }
    }

    void handleSingleDelete(int ev) throws IOException {
      if (ev == JSONParser.OBJECT_START) {
        handleDeleteMap(ev);
      } else {
        DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
        cmd.commitWithin = commitWithin;
        String id = getString(ev);
        cmd.setId(id);
        processor.processDelete(cmd);
      }
    }

    void handleDeleteArray(int ev) throws IOException {
      assert ev == JSONParser.ARRAY_START;
      for (; ; ) {
        ev = parser.nextEvent();
        if (ev == JSONParser.ARRAY_END) return;
        handleSingleDelete(ev);
      }
    }

    void handleDeleteMap(int ev) throws IOException {
      assert ev == JSONParser.OBJECT_START;

      DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
      cmd.commitWithin = commitWithin;

      while (true) {
        ev = parser.nextEvent();
        if (ev == JSONParser.STRING) {
          String key = parser.getString();
          if (parser.wasKey()) {
            if (ID.equals(key)) {
              cmd.setId(getString(parser.nextEvent()));
            } else if ("query".equals(key)) {
              cmd.setQuery(parser.getString());
            } else if ("commitWithin".equals(key)) {
              cmd.commitWithin = (int) parser.getLong();
            } else if (VERSION_FIELD.equals(key)) {
              cmd.setVersion(parser.getLong());
            } else if (_ROUTE_.equals(key)) {
              cmd.setRoute(parser.getString());
            } else {
              throw new SolrException(
                  SolrException.ErrorCode.BAD_REQUEST,
                  "Unknown key '" + key + "' at [" + parser.getPosition() + "]");
            }
          } else {
            throw new SolrException(
                SolrException.ErrorCode.BAD_REQUEST,
                "invalid string: " + key + " at [" + parser.getPosition() + "]");
          }
        } else if (ev == JSONParser.OBJECT_END) {
          if (cmd.getId() == null && cmd.getQuery() == null) {
            throw new SolrException(
                SolrException.ErrorCode.BAD_REQUEST,
                "Missing id or query for delete at [" + parser.getPosition() + "]");
          }

          processor.processDelete(cmd);
          return;
        } else {
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST,
              "Got: " + JSONParser.getEventString(ev) + " at [" + parser.getPosition() + "]");
        }
      }
    }

    RollbackUpdateCommand parseRollback() throws IOException {
      assertNextEvent(JSONParser.OBJECT_START);
      assertNextEvent(JSONParser.OBJECT_END);
      return new RollbackUpdateCommand(req);
    }

    void parseCommitOptions(CommitUpdateCommand cmd) throws IOException {
      assertNextEvent(JSONParser.OBJECT_START);
      @SuppressWarnings({"unchecked"})
      final Map<String, Object> map = (Map<String, Object>) ObjectBuilder.getVal(parser);

      // SolrParams currently expects string values...
      SolrParams p =
          new SolrParams() {
            @Override
            public String get(String param) {
              Object o = map.get(param);
              return o == null ? null : o.toString();
            }

            @Override
            public String[] getParams(String param) {
              return new String[] {get(param)};
            }

            @Override
            public Iterator<String> getParameterNamesIterator() {
              return map.keySet().iterator();
            }
          };

      RequestHandlerUtils.validateCommitParams(p);
      // default to the normal request params for commit options
      p = SolrParams.wrapDefaults(p, req.getParams());
      RequestHandlerUtils.updateCommit(cmd, p);
    }

    AddUpdateCommand parseAdd() throws IOException {
      AddUpdateCommand cmd = new AddUpdateCommand(req);
      cmd.commitWithin = commitWithin;
      cmd.overwrite = overwrite;

      while (true) {
        int ev = parser.nextEvent();
        if (ev == JSONParser.STRING) {
          if (parser.wasKey()) {
            String key = parser.getString();
            if ("doc".equals(key)) {
              if (cmd.solrDoc != null) {
                throw new SolrException(
                    SolrException.ErrorCode.BAD_REQUEST,
                    "Multiple documents in same"
                        + " add command at ["
                        + parser.getPosition()
                        + "]");
              }
              ev = assertNextEvent(JSONParser.OBJECT_START);
              cmd.solrDoc = parseDoc(ev);
            } else if (UpdateRequestHandler.OVERWRITE.equals(key)) {
              cmd.overwrite = parser.getBoolean(); // reads next boolean
            } else if (UpdateRequestHandler.COMMIT_WITHIN.equals(key)) {
              cmd.commitWithin = (int) parser.getLong();
            } else if ("boost".equals(key)) {
              String boost = parser.getNumberChars().toString();
              String message =
                  "Ignoring document boost: "
                      + boost
                      + " as index-time boosts are not supported anymore";
              if (WARNED_ABOUT_INDEX_TIME_BOOSTS.compareAndSet(false, true)) {
                log.warn(message);
              } else {
                log.debug(message);
              }
            } else {
              throw new SolrException(
                  SolrException.ErrorCode.BAD_REQUEST,
                  "Unknown key '" + key + "' at [" + parser.getPosition() + "]");
            }
          } else {
            throw new SolrException(
                SolrException.ErrorCode.BAD_REQUEST,
                "Should be a key " + " at [" + parser.getPosition() + "]");
          }
        } else if (ev == JSONParser.OBJECT_END) {
          if (cmd.solrDoc == null) {
            throw new SolrException(
                SolrException.ErrorCode.BAD_REQUEST,
                "Missing solr document at [" + parser.getPosition() + "]");
          }
          return cmd;
        } else {
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST,
              "Got: " + JSONParser.getEventString(ev) + " at [" + parser.getPosition() + "]");
        }
      }
    }

    void handleAdds() throws IOException {
      while (true) {
        AddUpdateCommand cmd = new AddUpdateCommand(req);
        cmd.commitWithin = commitWithin;
        cmd.overwrite = overwrite;

        int ev = parser.nextEvent();
        if (ev == JSONParser.ARRAY_END) break;

        assertEvent(ev, JSONParser.OBJECT_START);
        cmd.solrDoc = parseDoc(ev);
        processor.processAdd(cmd);
      }
    }

    int assertNextEvent(int expected) throws IOException {
      int got = parser.nextEvent();
      assertEvent(got, expected);
      return got;
    }

    void assertEvent(int ev, int expected) {
      if (ev != expected) {
        throw new SolrException(
            SolrException.ErrorCode.BAD_REQUEST,
            "Expected: "
                + JSONParser.getEventString(expected)
                + " but got "
                + JSONParser.getEventString(ev)
                + " at ["
                + parser.getPosition()
                + "]");
      }
    }

    private SolrInputDocument parseDoc(int ev) throws IOException {
      assert ev == JSONParser.OBJECT_START;

      SolrInputDocument sdoc = new SolrInputDocument();
      for (; ; ) {
        ev = parser.nextEvent();
        if (ev == JSONParser.OBJECT_END) {
          return sdoc;
        }
        String fieldName = parser.getString();

        if (fieldName.equals(JsonLoader.CHILD_DOC_KEY)) { // somewhat legacy
          ev = parser.nextEvent();
          assertEvent(ev, JSONParser.ARRAY_START);
          while ((ev = parser.nextEvent()) != JSONParser.ARRAY_END) {
            assertEvent(ev, JSONParser.OBJECT_START);

            sdoc.addChildDocument(parseDoc(ev));
          }
        } else {
          ev = parser.nextEvent();
          Object val = parseFieldValue(ev, fieldName);
          // SolrInputDocument.addField will do the right thing
          // if the doc already has another value for this field
          // (ie: repeating fieldname keys)
          sdoc.addField(fieldName, val);
        }
      }
    }

    private Object parseFieldValue(int ev, String fieldName) throws IOException {
      switch (ev) {
        case JSONParser.STRING:
          return parser.getString();
        case JSONParser.LONG:
          return parser.getLong();
        case JSONParser.NUMBER:
          return parser.getDouble();
        case JSONParser.BIGNUMBER:
          return parser.getNumberChars().toString();
        case JSONParser.BOOLEAN:
          return parser.getBoolean();
        case JSONParser.NULL:
          parser.getNull();
          return null;
        case JSONParser.ARRAY_START:
          return parseArrayFieldValue(ev, fieldName);
        case JSONParser.OBJECT_START:
          return parseObjectFieldValue(ev, fieldName);
        default:
          throw new SolrException(
              SolrException.ErrorCode.BAD_REQUEST,
              "Error parsing JSON field value. "
                  + "Unexpected "
                  + JSONParser.getEventString(ev)
                  + " at ["
                  + parser.getPosition()
                  + "], field="
                  + fieldName);
      }
    }

    private List<Object> parseArrayFieldValue(int ev, String fieldName) throws IOException {
      assert ev == JSONParser.ARRAY_START;

      ArrayList<Object> lst = new ArrayList<>(2);
      for (; ; ) {
        ev = parser.nextEvent();
        if (ev == JSONParser.ARRAY_END) {
          return lst;
        }
        lst.add(parseFieldValue(ev, fieldName));
      }
    }

    /** Parses this object as either a map for atomic update, or a child document. */
    private Object parseObjectFieldValue(int ev, String fieldName) throws IOException {
      assert ev == JSONParser.OBJECT_START;

      SolrInputDocument extendedSolrDocument = parseDoc(ev);
      // Is this a child doc (true) or a partial update (false)
      if (isChildDoc(extendedSolrDocument)) {
        return extendedSolrDocument;
      } else { // partial update: can include multiple modifiers (e.g. 'add', 'remove')
        Map<String, Object> map = CollectionUtil.newLinkedHashMap(extendedSolrDocument.size());
        for (SolrInputField inputField : extendedSolrDocument) {
          map.put(inputField.getName(), inputField.getValue());
        }
        return map;
      }
    }

    /** Is this a child doc (true) or a partial update (false)? */
    private boolean isChildDoc(SolrInputDocument extendedFieldValue) {
      IndexSchema schema = req.getSchema();
      // If schema doesn't support child docs, return false immediately, which
      // allows people to do atomic updates with multiple modifiers (like 'add'
      // and 'remove' for a single doc) and to do single-modifier updates for a
      // field with a name like 'set' that is defined in the schema, both of
      // which would otherwise fail.
      if (!schema.isUsableForChildDocs()) {
        return false;
      } else if (extendedFieldValue.size() != 1) {
        return true;
      }
      // if the only key is a field in the schema, assume it's a child doc
      final String fieldName = extendedFieldValue.iterator().next().getName();
      return schema.getFieldOrNull(fieldName) != null;
      // otherwise, assume it's "set" or some other verb for a partial update.
      // NOTE: it's fundamentally ambiguous with JSON; this is a best effort try.
    }
  }

  private static Object changeChildDoc(Object o) {
    if (o instanceof List) {
      return ((List<?>) o).stream().map(JsonLoader::changeChildDoc).collect(toList());
    }
    @SuppressWarnings("unchecked")
    Map<Object, Object> m = (Map<Object, Object>) o;
    if (m.containsKey(null)) m.put(CHILD_DOC_KEY, changeChildDoc(m.remove(null)));
    return m;
  }
}
